package cn.hetra.hj212.service;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import cn.hetra.hj212.core.HJ212Data;
import cn.hetra.hj212.session.SessionImpl;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.*;

/**
 * 管理与现场机的会话
 */
public class RemotingService implements InitializingBean {

    public static final AttributeKey<SessionImpl> SESSION_KEY = AttributeKey.valueOf("SESSION");
    private static final Logger LOGGER = LoggerFactory.getLogger(RemotingService.class);
    private final ConcurrentMap<ChannelId, Channel> connections = new ConcurrentHashMap<>();
    private final ConcurrentMap<String, Channel> clients = new ConcurrentHashMap<>();
    ThreadPoolExecutor threadPoolExecutor;

    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;

    public ScheduledThreadPoolExecutor getScheduler() {
        return scheduledThreadPoolExecutor;
    }
    public Optional<SessionImpl> getSession(String mn) {
        return Optional.ofNullable(clients.get(mn))
                .map(t->t.attr(SESSION_KEY).get());
    }

    public Set<String> getMNs() {
        return clients.keySet();
    }

    public Set<ChannelId> getConnections() {
        return connections.keySet();
    }

    public SessionImpl registerMN(Channel channel) {
        clients.computeIfAbsent(channel.attr(HJ212Data.MN_ATTR).get(), mn -> {
            getScheduler().schedule(() -> {
                SessionImpl session = new SessionImpl(channel, threadPoolExecutor, scheduledThreadPoolExecutor);
                channel.attr(SESSION_KEY).set(session);
                session.startInit();
            }, 500, TimeUnit.MILLISECONDS);
            return channel;
        });
        return channel.attr(SESSION_KEY).get();
    }

    public void addConnectionEntry(Channel channel) {
        connections.put(channel.id(), channel);
    }

    public Channel removeConnection(final Object remotingConnectionID) {
        Channel remove = connections.remove(remotingConnectionID);
        Optional.ofNullable(remove.attr(HJ212Data.MN_ATTR).get())
                .ifPresent(mn -> clients.remove(mn));
        return remove;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(10, new ThreadFactoryBuilder()
                .setUncaughtExceptionHandler((t, ex) -> {
                    LOGGER.error("{}", t, ex);
                })
                .setNameFormat("nj212-init-%d")
                .setDaemon(true).build());
        threadPoolExecutor = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder()
                .setUncaughtExceptionHandler((t, ex) -> {
                    LOGGER.error("{}", t, ex);
                })
                .setNameFormat("response-executor-%d")
                .setDaemon(true).build());
    }

}
